package com.example.binlog.binlog;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.github.shyiko.mysql.binlog.BinaryLogClient;
import com.github.shyiko.mysql.binlog.event.DeleteRowsEventData;
import com.github.shyiko.mysql.binlog.event.EventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.UpdateRowsEventData;
import com.github.shyiko.mysql.binlog.event.WriteRowsEventData;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.ApplicationArguments;
import org.springframework.boot.ApplicationRunner;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

/**
 * @author ghl
 * @version 1.0
 * @date 2022/8/16 9:57
 */
@Component
public class MysqlBinLogClient implements ApplicationRunner {

    private static final Logger log = LoggerFactory.getLogger(MysqlBinLogClient.class);

    @Value("${mysql.host}")
    private String host;
    @Value("${mysql.port}")
    private Integer port;
    @Value("${mysql.username}")
    private String userName;
    @Value("${mysql.password}")
    private String password;


    @Override
    public void run(ApplicationArguments args) throws Exception {
        //项目启动完成连接bin-log
        log.info("项目启动完成连接bin-log");
        new Thread(() -> {
            connectMysqlBinLog();
        }).start();

    }

    /**
     * 连接mysqlBinLog
     */
    public void connectMysqlBinLog() {
        BinaryLogClient client = new BinaryLogClient(host, port, userName, password);
        client.setServerId(2);

        client.registerEventListener(event -> {
            EventData data = event.getData();
            if (data instanceof TableMapEventData) {
                System.out.println("Table:");
                TableMapEventData tableMapEventData = (TableMapEventData) data;
                System.out.println(tableMapEventData.getTableId() + ": [" + tableMapEventData.getDatabase() + "-" + tableMapEventData.getTable() + "]");
            }
            if (data instanceof UpdateRowsEventData) {
                System.out.println("Update:");
                System.out.println(data.toString());

                UpdateRowsEventData updateRowsEventData = (UpdateRowsEventData) data;
                List<Map.Entry<Serializable[], Serializable[]>> rows = updateRowsEventData.getRows();

                for (Map.Entry<Serializable[], Serializable[]> row : rows) {
                    List<Serializable> entries = Arrays.asList(row.getValue());
                    System.out.println(entries);
                    JSONObject dataObject = getDataObject(entries);
                    System.out.println(dataObject);
                }

            } else if (data instanceof WriteRowsEventData) {
                System.out.println("Insert:");
                System.out.println(data.toString());
            } else if (data instanceof DeleteRowsEventData) {
                System.out.println("Delete:");
                System.out.println(data.toString());
            }
        });

        try {
            client.connect();
        } catch (IOException e) {
            e.printStackTrace();
        }

    }

    private static JSONObject getDataObject(List message) {
        JSONObject resultObject = new JSONObject();
        String format = "{\"id\":\"0\",\"name\":\"1\",\"age\":\"2\",\"create_time\":\"3\",\"update_time\":\"4\"}";
        JSONObject json = JSON.parseObject(format);
        for (String key : json.keySet()) {
            resultObject.put(key, message.get(json.getInteger(key)));
        }
        return resultObject;
    }


}
